home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / idlelib / rpc.py < prev    next >
Text File  |  2005-10-18  |  20KB  |  596 lines

  1. """RPC Implemention, originally written for the Python Idle IDE
  2.  
  3. For security reasons, GvR requested that Idle's Python execution server process
  4. connect to the Idle process, which listens for the connection.  Since Idle has
  5. has only one client per server, this was not a limitation.
  6.  
  7.    +---------------------------------+ +-------------+
  8.    | SocketServer.BaseRequestHandler | | SocketIO    |
  9.    +---------------------------------+ +-------------+
  10.                    ^                   | register()  |
  11.                    |                   | unregister()|
  12.                    |                   +-------------+
  13.                    |                      ^  ^
  14.                    |                      |  |
  15.                    | + -------------------+  |
  16.                    | |                       |
  17.    +-------------------------+        +-----------------+
  18.    | RPCHandler              |        | RPCClient       |
  19.    | [attribute of RPCServer]|        |                 |
  20.    +-------------------------+        +-----------------+
  21.  
  22. The RPCServer handler class is expected to provide register/unregister methods.
  23. RPCHandler inherits the mix-in class SocketIO, which provides these methods.
  24.  
  25. See the Idle run.main() docstring for further information on how this was
  26. accomplished in Idle.
  27.  
  28. """
  29.  
  30. import sys
  31. import os
  32. import socket
  33. import select
  34. import SocketServer
  35. import struct
  36. import cPickle as pickle
  37. import threading
  38. import Queue
  39. import traceback
  40. import copy_reg
  41. import types
  42. import marshal
  43.  
  44.  
  45. def unpickle_code(ms):
  46.     co = marshal.loads(ms)
  47.     assert isinstance(co, types.CodeType)
  48.     return co
  49.  
  50. def pickle_code(co):
  51.     assert isinstance(co, types.CodeType)
  52.     ms = marshal.dumps(co)
  53.     return unpickle_code, (ms,)
  54.  
  55. # XXX KBK 24Aug02 function pickling capability not used in Idle
  56. #  def unpickle_function(ms):
  57. #      return ms
  58.  
  59. #  def pickle_function(fn):
  60. #      assert isinstance(fn, type.FunctionType)
  61. #      return repr(fn)
  62.  
  63. copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
  64. # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
  65.  
  66. BUFSIZE = 8*1024
  67. LOCALHOST = '127.0.0.1'
  68.  
  69. class RPCServer(SocketServer.TCPServer):
  70.  
  71.     def __init__(self, addr, handlerclass=None):
  72.         if handlerclass is None:
  73.             handlerclass = RPCHandler
  74.         SocketServer.TCPServer.__init__(self, addr, handlerclass)
  75.  
  76.     def server_bind(self):
  77.         "Override TCPServer method, no bind() phase for connecting entity"
  78.         pass
  79.  
  80.     def server_activate(self):
  81.         """Override TCPServer method, connect() instead of listen()
  82.  
  83.         Due to the reversed connection, self.server_address is actually the
  84.         address of the Idle Client to which we are connecting.
  85.  
  86.         """
  87.         self.socket.connect(self.server_address)
  88.  
  89.     def get_request(self):
  90.         "Override TCPServer method, return already connected socket"
  91.         return self.socket, self.server_address
  92.  
  93.     def handle_error(self, request, client_address):
  94.         """Override TCPServer method
  95.  
  96.         Error message goes to __stderr__.  No error message if exiting
  97.         normally or socket raised EOF.  Other exceptions not handled in
  98.         server code will cause os._exit.
  99.  
  100.         """
  101.         try:
  102.             raise
  103.         except SystemExit:
  104.             raise
  105.         except:
  106.             erf = sys.__stderr__
  107.             print>>erf, '\n' + '-'*40
  108.             print>>erf, 'Unhandled server exception!'
  109.             print>>erf, 'Thread: %s' % threading.currentThread().getName()
  110.             print>>erf, 'Client Address: ', client_address
  111.             print>>erf, 'Request: ', repr(request)
  112.             traceback.print_exc(file=erf)
  113.             print>>erf, '\n*** Unrecoverable, server exiting!'
  114.             print>>erf, '-'*40
  115.             os._exit(0)
  116.  
  117. #----------------- end class RPCServer --------------------
  118.  
  119. objecttable = {}
  120. request_queue = Queue.Queue(0)
  121. response_queue = Queue.Queue(0)
  122.  
  123.  
  124. class SocketIO:
  125.  
  126.     nextseq = 0
  127.  
  128.     def __init__(self, sock, objtable=None, debugging=None):
  129.         self.sockthread = threading.currentThread()
  130.         if debugging is not None:
  131.             self.debugging = debugging
  132.         self.sock = sock
  133.         if objtable is None:
  134.             objtable = objecttable
  135.         self.objtable = objtable
  136.         self.responses = {}
  137.         self.cvars = {}
  138.  
  139.     def close(self):
  140.         sock = self.sock
  141.         self.sock = None
  142.         if sock is not None:
  143.             sock.close()
  144.  
  145.     def exithook(self):
  146.         "override for specific exit action"
  147.         os._exit()
  148.  
  149.     def debug(self, *args):
  150.         if not self.debugging:
  151.             return
  152.         s = self.location + " " + str(threading.currentThread().getName())
  153.         for a in args:
  154.             s = s + " " + str(a)
  155.         print>>sys.__stderr__, s
  156.  
  157.     def register(self, oid, object):
  158.         self.objtable[oid] = object
  159.  
  160.     def unregister(self, oid):
  161.         try:
  162.             del self.objtable[oid]
  163.         except KeyError:
  164.             pass
  165.  
  166.     def localcall(self, seq, request):
  167.         self.debug("localcall:", request)
  168.         try:
  169.             how, (oid, methodname, args, kwargs) = request
  170.         except TypeError:
  171.             return ("ERROR", "Bad request format")
  172.         if not self.objtable.has_key(oid):
  173.             return ("ERROR", "Unknown object id: %r" % (oid,))
  174.         obj = self.objtable[oid]
  175.         if methodname == "__methods__":
  176.             methods = {}
  177.             _getmethods(obj, methods)
  178.             return ("OK", methods)
  179.         if methodname == "__attributes__":
  180.             attributes = {}
  181.             _getattributes(obj, attributes)
  182.             return ("OK", attributes)
  183.         if not hasattr(obj, methodname):
  184.             return ("ERROR", "Unsupported method name: %r" % (methodname,))
  185.         method = getattr(obj, methodname)
  186.         try:
  187.             if how == 'CALL':
  188.                 ret = method(*args, **kwargs)
  189.                 if isinstance(ret, RemoteObject):
  190.                     ret = remoteref(ret)
  191.                 return ("OK", ret)
  192.             elif how == 'QUEUE':
  193.                 request_queue.put((seq, (method, args, kwargs)))
  194.                 return("QUEUED", None)
  195.             else:
  196.                 return ("ERROR", "Unsupported message type: %s" % how)
  197.         except SystemExit:
  198.             raise
  199.         except socket.error:
  200.             raise
  201.         except:
  202.             self.debug("localcall:EXCEPTION")
  203.             traceback.print_exc(file=sys.__stderr__)
  204.             return ("EXCEPTION", None)
  205.  
  206.     def remotecall(self, oid, methodname, args, kwargs):
  207.         self.debug("remotecall:asynccall: ", oid, methodname)
  208.         seq = self.asynccall(oid, methodname, args, kwargs)
  209.         return self.asyncreturn(seq)
  210.  
  211.     def remotequeue(self, oid, methodname, args, kwargs):
  212.         self.debug("remotequeue:asyncqueue: ", oid, methodname)
  213.         seq = self.asyncqueue(oid, methodname, args, kwargs)
  214.         return self.asyncreturn(seq)
  215.  
  216.     def asynccall(self, oid, methodname, args, kwargs):
  217.         request = ("CALL", (oid, methodname, args, kwargs))
  218.         seq = self.newseq()
  219.         if threading.currentThread() != self.sockthread:
  220.             cvar = threading.Condition()
  221.             self.cvars[seq] = cvar
  222.         self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
  223.         self.putmessage((seq, request))
  224.         return seq
  225.  
  226.     def asyncqueue(self, oid, methodname, args, kwargs):
  227.         request = ("QUEUE", (oid, methodname, args, kwargs))
  228.         seq = self.newseq()
  229.         if threading.currentThread() != self.sockthread:
  230.             cvar = threading.Condition()
  231.             self.cvars[seq] = cvar
  232.         self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
  233.         self.putmessage((seq, request))
  234.         return seq
  235.  
  236.     def asyncreturn(self, seq):
  237.         self.debug("asyncreturn:%d:call getresponse(): " % seq)
  238.         response = self.getresponse(seq, wait=0.05)
  239.         self.debug(("asyncreturn:%d:response: " % seq), response)
  240.         return self.decoderesponse(response)
  241.  
  242.     def decoderesponse(self, response):
  243.         how, what = response
  244.         if how == "OK":
  245.             return what
  246.         if how == "QUEUED":
  247.             return None
  248.         if how == "EXCEPTION":
  249.             self.debug("decoderesponse: EXCEPTION")
  250.             return None
  251.         if how == "EOF":
  252.             self.debug("decoderesponse: EOF")
  253.             self.decode_interrupthook()
  254.             return None
  255.         if how == "ERROR":
  256.             self.debug("decoderesponse: Internal ERROR:", what)
  257.             raise RuntimeError, what
  258.         raise SystemError, (how, what)
  259.  
  260.     def decode_interrupthook(self):
  261.         ""
  262.         raise EOFError
  263.  
  264.     def mainloop(self):
  265.         """Listen on socket until I/O not ready or EOF
  266.  
  267.         pollresponse() will loop looking for seq number None, which
  268.         never comes, and exit on EOFError.
  269.  
  270.         """
  271.         try:
  272.             self.getresponse(myseq=None, wait=0.05)
  273.         except EOFError:
  274.             self.debug("mainloop:return")
  275.             return
  276.  
  277.     def getresponse(self, myseq, wait):
  278.         response = self._getresponse(myseq, wait)
  279.         if response is not None:
  280.             how, what = response
  281.             if how == "OK":
  282.                 response = how, self._proxify(what)
  283.         return response
  284.  
  285.     def _proxify(self, obj):
  286.         if isinstance(obj, RemoteProxy):
  287.             return RPCProxy(self, obj.oid)
  288.         if isinstance(obj, types.ListType):
  289.             return map(self._proxify, obj)
  290.         # XXX Check for other types -- not currently needed
  291.         return obj
  292.  
  293.     def _getresponse(self, myseq, wait):
  294.         self.debug("_getresponse:myseq:", myseq)
  295.         if threading.currentThread() is self.sockthread:
  296.             # this thread does all reading of requests or responses
  297.             while 1:
  298.                 response = self.pollresponse(myseq, wait)
  299.                 if response is not None:
  300.                     return response
  301.         else:
  302.             # wait for notification from socket handling thread
  303.             cvar = self.cvars[myseq]
  304.             cvar.acquire()
  305.             while not self.responses.has_key(myseq):
  306.                 cvar.wait()
  307.             response = self.responses[myseq]
  308.             self.debug("_getresponse:%s: thread woke up: response: %s" %
  309.                        (myseq, response))
  310.             del self.responses[myseq]
  311.             del self.cvars[myseq]
  312.             cvar.release()
  313.             return response
  314.  
  315.     def newseq(self):
  316.         self.nextseq = seq = self.nextseq + 2
  317.         return seq
  318.  
  319.     def putmessage(self, message):
  320.         self.debug("putmessage:%d:" % message[0])
  321.         try:
  322.             s = pickle.dumps(message)
  323.         except pickle.PicklingError:
  324.             print >>sys.__stderr__, "Cannot pickle:", repr(message)
  325.             raise
  326.         s = struct.pack("<i", len(s)) + s
  327.         while len(s) > 0:
  328.             try:
  329.                 r, w, x = select.select([], [self.sock], [])
  330.                 n = self.sock.send(s[:BUFSIZE])
  331.             except (AttributeError, socket.error):
  332.                 # socket was closed
  333.                 raise IOError
  334.             else:
  335.                 s = s[n:]
  336.  
  337.     buffer = ""
  338.     bufneed = 4
  339.     bufstate = 0 # meaning: 0 => reading count; 1 => reading data
  340.  
  341.     def pollpacket(self, wait):
  342.         self._stage0()
  343.         if len(self.buffer) < self.bufneed:
  344.             r, w, x = select.select([self.sock.fileno()], [], [], wait)
  345.             if len(r) == 0:
  346.                 return None
  347.             try:
  348.                 s = self.sock.recv(BUFSIZE)
  349.             except socket.error:
  350.                 raise EOFError
  351.             if len(s) == 0:
  352.                 raise EOFError
  353.             self.buffer += s
  354.             self._stage0()
  355.         return self._stage1()
  356.  
  357.     def _stage0(self):
  358.         if self.bufstate == 0 and len(self.buffer) >= 4:
  359.             s = self.buffer[:4]
  360.             self.buffer = self.buffer[4:]
  361.             self.bufneed = struct.unpack("<i", s)[0]
  362.             self.bufstate = 1
  363.  
  364.     def _stage1(self):
  365.         if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
  366.             packet = self.buffer[:self.bufneed]
  367.             self.buffer = self.buffer[self.bufneed:]
  368.             self.bufneed = 4
  369.             self.bufstate = 0
  370.             return packet
  371.  
  372.     def pollmessage(self, wait):
  373.         packet = self.pollpacket(wait)
  374.         if packet is None:
  375.             return None
  376.         try:
  377.             message = pickle.loads(packet)
  378.         except pickle.UnpicklingError:
  379.             print >>sys.__stderr__, "-----------------------"
  380.             print >>sys.__stderr__, "cannot unpickle packet:", repr(packet)
  381.             traceback.print_stack(file=sys.__stderr__)
  382.             print >>sys.__stderr__, "-----------------------"
  383.             raise
  384.         return message
  385.  
  386.     def pollresponse(self, myseq, wait):
  387.         """Handle messages received on the socket.
  388.  
  389.         Some messages received may be asynchronous 'call' or 'queue' requests,
  390.         and some may be responses for other threads.
  391.  
  392.         'call' requests are passed to self.localcall() with the expectation of
  393.         immediate execution, during which time the socket is not serviced.
  394.  
  395.         'queue' requests are used for tasks (which may block or hang) to be
  396.         processed in a different thread.  These requests are fed into
  397.         request_queue by self.localcall().  Responses to queued requests are
  398.         taken from response_queue and sent across the link with the associated
  399.         sequence numbers.  Messages in the queues are (sequence_number,
  400.         request/response) tuples and code using this module removing messages
  401.         from the request_queue is responsible for returning the correct
  402.         sequence number in the response_queue.
  403.  
  404.         pollresponse() will loop until a response message with the myseq
  405.         sequence number is received, and will save other responses in
  406.         self.responses and notify the owning thread.
  407.  
  408.         """
  409.         while 1:
  410.             # send queued response if there is one available
  411.             try:
  412.                 qmsg = response_queue.get(0)
  413.             except Queue.Empty:
  414.                 pass
  415.             else:
  416.                 seq, response = qmsg
  417.                 message = (seq, ('OK', response))
  418.                 self.putmessage(message)
  419.             # poll for message on link
  420.             try:
  421.                 message = self.pollmessage(wait)
  422.                 if message is None:  # socket not ready
  423.                     return None
  424.             except EOFError:
  425.                 self.handle_EOF()
  426.                 return None
  427.             except AttributeError:
  428.                 return None
  429.             seq, resq = message
  430.             how = resq[0]
  431.             self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
  432.             # process or queue a request
  433.             if how in ("CALL", "QUEUE"):
  434.                 self.debug("pollresponse:%d:localcall:call:" % seq)
  435.                 response = self.localcall(seq, resq)
  436.                 self.debug("pollresponse:%d:localcall:response:%s"
  437.                            % (seq, response))
  438.                 if how == "CALL":
  439.                     self.putmessage((seq, response))
  440.                 elif how == "QUEUE":
  441.                     # don't acknowledge the 'queue' request!
  442.                     pass
  443.                 continue
  444.             # return if completed message transaction
  445.             elif seq == myseq:
  446.                 return resq
  447.             # must be a response for a different thread:
  448.             else:
  449.                 cv = self.cvars.get(seq, None)
  450.                 # response involving unknown sequence number is discarded,
  451.                 # probably intended for prior incarnation of server
  452.                 if cv is not None:
  453.                     cv.acquire()
  454.                     self.responses[seq] = resq
  455.                     cv.notify()
  456.                     cv.release()
  457.                 continue
  458.  
  459.     def handle_EOF(self):
  460.         "action taken upon link being closed by peer"
  461.         self.EOFhook()
  462.         self.debug("handle_EOF")
  463.         for key in self.cvars:
  464.             cv = self.cvars[key]
  465.             cv.acquire()
  466.             self.responses[key] = ('EOF', None)
  467.             cv.notify()
  468.             cv.release()
  469.         # call our (possibly overridden) exit function
  470.         self.exithook()
  471.  
  472.     def EOFhook(self):
  473.         "Classes using rpc client/server can override to augment EOF action"
  474.         pass
  475.  
  476. #----------------- end class SocketIO --------------------
  477.  
  478. class RemoteObject:
  479.     # Token mix-in class
  480.     pass
  481.  
  482. def remoteref(obj):
  483.     oid = id(obj)
  484.     objecttable[oid] = obj
  485.     return RemoteProxy(oid)
  486.  
  487. class RemoteProxy:
  488.  
  489.     def __init__(self, oid):
  490.         self.oid = oid
  491.  
  492. class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
  493.  
  494.     debugging = False
  495.     location = "#S"  # Server
  496.  
  497.     def __init__(self, sock, addr, svr):
  498.         svr.current_handler = self ## cgt xxx
  499.         SocketIO.__init__(self, sock)
  500.         SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
  501.  
  502.     def handle(self):
  503.         "handle() method required by SocketServer"
  504.         self.mainloop()
  505.  
  506.     def get_remote_proxy(self, oid):
  507.         return RPCProxy(self, oid)
  508.  
  509. class RPCClient(SocketIO):
  510.  
  511.     debugging = False
  512.     location = "#C"  # Client
  513.  
  514.     nextseq = 1 # Requests coming from the client are odd numbered
  515.  
  516.     def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
  517.         self.listening_sock = socket.socket(family, type)
  518.         self.listening_sock.setsockopt(socket.SOL_SOCKET,
  519.                                        socket.SO_REUSEADDR, 1)
  520.         self.listening_sock.bind(address)
  521.         self.listening_sock.listen(1)
  522.  
  523.     def accept(self):
  524.         working_sock, address = self.listening_sock.accept()
  525.         if self.debugging:
  526.             print>>sys.__stderr__, "****** Connection request from ", address
  527.         if address[0] == LOCALHOST:
  528.             SocketIO.__init__(self, working_sock)
  529.         else:
  530.             print>>sys.__stderr__, "** Invalid host: ", address
  531.             raise socket.error
  532.  
  533.     def get_remote_proxy(self, oid):
  534.         return RPCProxy(self, oid)
  535.  
  536. class RPCProxy:
  537.  
  538.     __methods = None
  539.     __attributes = None
  540.  
  541.     def __init__(self, sockio, oid):
  542.         self.sockio = sockio
  543.         self.oid = oid
  544.  
  545.     def __getattr__(self, name):
  546.         if self.__methods is None:
  547.             self.__getmethods()
  548.         if self.__methods.get(name):
  549.             return MethodProxy(self.sockio, self.oid, name)
  550.         if self.__attributes is None:
  551.             self.__getattributes()
  552.         if not self.__attributes.has_key(name):
  553.             raise AttributeError, name
  554.  
  555.     def __getattributes(self):
  556.         self.__attributes = self.sockio.remotecall(self.oid,
  557.                                                 "__attributes__", (), {})
  558.  
  559.     def __getmethods(self):
  560.         self.__methods = self.sockio.remotecall(self.oid,
  561.                                                 "__methods__", (), {})
  562.  
  563. def _getmethods(obj, methods):
  564.     # Helper to get a list of methods from an object
  565.     # Adds names to dictionary argument 'methods'
  566.     for name in dir(obj):
  567.         attr = getattr(obj, name)
  568.         if callable(attr):
  569.             methods[name] = 1
  570.     if type(obj) == types.InstanceType:
  571.         _getmethods(obj.__class__, methods)
  572.     if type(obj) == types.ClassType:
  573.         for super in obj.__bases__:
  574.             _getmethods(super, methods)
  575.  
  576. def _getattributes(obj, attributes):
  577.     for name in dir(obj):
  578.         attr = getattr(obj, name)
  579.         if not callable(attr):
  580.             attributes[name] = 1
  581.  
  582. class MethodProxy:
  583.  
  584.     def __init__(self, sockio, oid, name):
  585.         self.sockio = sockio
  586.         self.oid = oid
  587.         self.name = name
  588.  
  589.     def __call__(self, *args, **kwargs):
  590.         value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
  591.         return value
  592.  
  593.  
  594. # XXX KBK 09Sep03  We need a proper unit test for this module.  Previously
  595. #                  existing test code was removed at Rev 1.27.
  596.